package org.hepeng.workx.mybatis.event.publisher;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.apache.commons.collections.CollectionUtils;
import org.hepeng.workx.mybatis.event.ExecuteEvent;
import org.hepeng.workx.mybatis.event.listener.ExecuteEventListener;
import org.hepeng.workx.util.concurrent.NamedThreadFactory;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author he peng
 */
public class ExecuteEventMulticastPublisher extends AbstractExecuteEventPublisher {

    private Executor executor;

    public ExecuteEventMulticastPublisher() {
        ThreadFactory threadFactory = new NamedThreadFactory("Mybatis-Event-Publisher" , true);
        this.executor = new ThreadPoolExecutor(2 , 2 , 0L , TimeUnit.MILLISECONDS , new LinkedBlockingQueue<>() , threadFactory);
    }

    public ExecuteEventMulticastPublisher(Executor executor) {
        this.executor = executor;
    }

    @Override
    public void publishEvent(ExecuteEvent event) {
        Set<ExecuteEventListener> allListener = getAllListener();
        if (CollectionUtils.isNotEmpty(allListener)) {
            Observable<ExecuteEvent> observable = Observable.just(event)
                    .subscribeOn(Schedulers.from(this.executor));
            allListener.forEach(
                    listener ->
                            observable.subscribe(data -> listener.onExecuteEvent(event)));
        }
    }
}
